﻿
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Core.RabbitMQBus.Log;
using Core.RabbitMQBus.Common;

namespace Core.RabbitMQBus.EventBus
{
    public class RabbitMqSubscriber : IRabbitMqSubscriber
    {
        private readonly ILogger<RabbitMqSubscriber> _rabbitMqSubscriberLogger;
        private readonly ILoggerHelper _loggerHelper;
        private readonly IConnectionChannel _connectionChannel;
        public RabbitMqSubscriber(IConnectionChannel connectionChannel, ILoggerHelper loggerHelper, ILogger<RabbitMqSubscriber> rabbitMqSubscriberLogger)
        {
            _loggerHelper = loggerHelper;
            _connectionChannel = connectionChannel;
            _rabbitMqSubscriberLogger = rabbitMqSubscriberLogger;
        }

        /// <summary>
        /// 订阅事件
        /// </summary>
        /// <returns></returns>
        public void Subscriber(string queueName)
        {
            var channel = _connectionChannel.GetChannel();
            //声明队列
            channel.QueueDeclare(queue: queueName, durable: true, false, false, null);
            //绑定交换机队列
            channel.QueueBind(queue: queueName, exchange: _connectionChannel.ExchangeName, routingKey: queueName);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicQos(0, 1000, false);
            consumer.Shutdown += OnConsumerShutdown;
            consumer.Registered += OnConsumerRegistered;
            consumer.Unregistered += OnConsumerUnregistered;
            consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.Default.GetString(body);
                var subscriberServiceMethod = RegisterSubscriberCache.GetSubscriberMethod(ea.RoutingKey);
                var obj = SwifterJsonSerializer.DeserializeObject(message, subscriberServiceMethod.MethodParameterType);
                var methodTask = (Task<ExecuteMethodResult>)subscriberServiceMethod.MethodInfo.Invoke(subscriberServiceMethod.SubscriberService,
                    new object[] { obj });
                if (methodTask.Result.IsSucceed)
                {
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }
                else
                {
                    _loggerHelper.LogInfo("订阅执行异常", methodTask.Result.Message.ToString());
                    channel.BasicNack(ea.DeliveryTag, false, true);
                }
            };
            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        }

        /// <summary>
        /// 取消消费者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
        {
            _loggerHelper.LogInfo("取消消费者", e.ConsumerTag);
        }

        /// <summary>
        /// 消费者取消注册
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
        {
            _loggerHelper.LogInfo("消费者取消注册", e.ConsumerTag);
        }

        /// <summary>
        /// 消费者注册
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
        {
            _rabbitMqSubscriberLogger.LogInformation($"消费者已启动:{e.ConsumerTag}");
            _loggerHelper.LogInfo("消费者注册", e.ConsumerTag);
        }

        /// <summary>
        /// 消费者宕机
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
        {
            _loggerHelper.LogInfo("消费者宕机", e.ReplyText);
        }
    }
}
